package com.bigfans.framework.kafka;

import com.bigfans.framework.utils.JsonUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumerRecordProcessor {

    private ExecutorService executorService;
    private KafkaListenerBean listenerBean;

    public KafkaConsumerRecordProcessor(KafkaListenerBean listenerBean) {
        int threadCount = 0;
        if (listenerBean.getThreadCount() != null) {
            threadCount = listenerBean.getThreadCount();
        } else {
            threadCount = Runtime.getRuntime().availableProcessors() + 1;
        }
        this.executorService = Executors.newFixedThreadPool(threadCount);
        this.listenerBean = listenerBean;
    }

    public void process(ConsumerRecord<String, String> record) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                processMessage(record, listenerBean);
            }
        });
    }

    public void processMessage(ConsumerRecord<String, String> record, KafkaListenerBean listenerBean) {
        Method method = listenerBean.getListenerMethod();
        Object object = listenerBean.getBean();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Class<?> paramClass = parameterTypes[0];
        try {
            if (paramClass.equals(String.class)) {
                method.invoke(object, record.value());
            } else if (paramClass.equals(ConsumerRecord.class)) {
                method.invoke(object, record);
            } else {
                Object paramObject = JsonUtils.toObject(record.value(), paramClass);
                method.invoke(object, paramObject);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void shutdown() {
        if (this.executorService != null && !this.executorService.isShutdown()) {
            this.executorService.shutdown();
        }
    }
}
